<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8">
<meta http-equiv="cache-control" content="no-cache">
<title>Genivia - The mq plugin for inbound message queueing and message replay</title>
<link href="genivia_tabs.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="jquery.js"></script>
<script type="text/javascript" src="dynsections.js"></script>
<link href="doxygen.css" rel="stylesheet" type="text/css">
<link href="genivia_content.css" rel="stylesheet" type="text/css">
</head>
<body>
<div id="top">
 <div id="titlearea">
  <table height="72px" width="100%" cellspacing="0" cellpadding="0">
   <tbody>
    <tr>
     <td width="10%">&nbsp;</td>
     <td width="175px"><a href="https://www.genivia.com"><img alt="Genivia" src="GeniviaLogo2_trans_noslogan.png"/></a></td>
     <td class="tab_home"><a href="https://www.genivia.com">Home</a></td>
     <td class="tab_home"><a href="https://www.genivia.com/docs.html">Documentation</a></td>
     <td>
      <div style="float: right; font-size: 18px; font-weight: bold;">The mq plugin for inbound message queueing and message replay</div>
      <br>
      <div style="float: right; font-size: 10px;">updated Sun Dec 9 2018 by Robert van Engelen</div>
     </td>
     <td width="10%">&nbsp;</td>
    </tr>
   </tbody>
  </table>
 </div>
<!-- Generated by Doxygen 1.8.11 -->
  <div id="navrow1" class="tabs">
    <ul class="tablist">
      <li><a href="index.html"><span>Main&#160;Page</span></a></li>
      <li class="current"><a href="pages.html"><span>Related&#160;Pages</span></a></li>
      <li><a href="annotated.html"><span>Classes</span></a></li>
      <li><a href="files.html"><span>Files</span></a></li>
    </ul>
  </div>
</div><!-- top -->
<div class="header">
  <div class="headertitle">
<div class="title">The mq plugin for inbound message queueing and message replay </div>  </div>
</div><!--header-->
<div class="contents">
<div class="textblock"><p>The inbound message queueing plugin can be used to queue messages that should not be discarded with the WS-RM protocol's NoDiscard behavior. Messages that are out of sequence as per WS-RM protocol and should be handled by one thread (or a thread pool) should be queued for later replay and service operation invocation. If an unlimited number of threads is available, the simplest WS-RM protocol NoDiscard behavior is implemented by starting a thread for each inbound message and letting the thread block with the <code><a class="el" href="wsrmapi_8h.html#af8e440bcc087dbd943b30a112f476193" title="Receiver (server)-side check for the presence of WS-Addressing and WS-RM header blocks in the SOAP he...">soap_wsrm_check_and_wait()</a></code> or <code><a class="el" href="wsrmapi_8h.html#a8641e119a993564112e0a0e4a1000d0d" title="Receiver (server)-side check for the presence of WS-Addressing and WS-RM header blocks in the SOAP he...">soap_wsrm_check_send_empty_response_and_wait()</a></code> calls. However, that approach is not efficient with HTTP keep-alive because the next messages on the keep-alive socket will be blocked from being processed. This plugin is designed to process messages on an HTTP keep-alive socket even when operations block.</p>
<h1><a class="anchor" id="mq_1"></a>
Server-Side Queueing of One-Way Messages</h1>
<p>Queueing one-way messages for internal replay is implemented with the message queueing plugin as follows, by queueing inbound messages received on a single socket and then replaying them all in sequence as received from the socket:</p>
<div class="fragment"><div class="line"><span class="preprocessor">#include &quot;<a class="code" href="mq_8h.html">mq.h</a>&quot;</span></div><div class="line"></div><div class="line"><span class="keywordtype">int</span> main()</div><div class="line">{</div><div class="line">  <span class="keyword">struct </span>soap *soap = soap_new1(SOAP_IO_KEEPALIVE);</div><div class="line">  soap_register_plugin(soap, <a class="code" href="mq_8h.html#a96bf20fae13f6f4318f1592b9d0adfba">soap_mq</a>);</div><div class="line">  ...</div><div class="line">  <span class="comment">// initializations, port bind etc.</span></div><div class="line">  ...</div><div class="line">  <span class="keywordflow">while</span> (soap_valid_socket(soap_accept(soap)))</div><div class="line">  {</div><div class="line">    <span class="comment">// queue all messages on this socket (socket is HTTP keep alive)</span></div><div class="line">    <span class="comment">// for each message received, we immediately respond with HTTP 202 Accepted</span></div><div class="line">    <span class="keyword">struct </span>ms_queue *queue = <a class="code" href="mq_8h.html#abffc350d4d0892c678b294badd572f11">soap_mq_queue</a>(soap);</div><div class="line">    <span class="keyword">struct </span>ms_msg *msg;</div><div class="line">    <span class="keywordflow">while</span> (<a class="code" href="mq_8h.html#a762d9aa6281902622b846135918b897f">soap_mq_get</a>(soap, queue))</div><div class="line">      soap_send_empty_response(soap, 202); <span class="comment">// 202 Accepted</span></div><div class="line"></div><div class="line">    <span class="comment">// we now internally replay all messages to invoke services</span></div><div class="line">    <span class="comment">// services are assumed to NOT send a response message back</span></div><div class="line">    <span class="comment">// i.e. one-way operations</span></div><div class="line">    <span class="keywordflow">for</span> (msg = <a class="code" href="mq_8h.html#a5efe3ffa00498c5b01ccdb5d447cffa2">soap_mq_begin</a>(queue); msg; msg = <a class="code" href="mq_8h.html#aa7609d8ff2851cb2b0237710216039aa">soap_mq_next</a>(msg))</div><div class="line">      soap_serve(&amp;msg-&gt;soap);</div><div class="line"></div><div class="line">    <span class="comment">// delete all queued messages, this also calls the following functions on each queued msg context:</span></div><div class="line">    <span class="comment">//   soap_destroy(&amp;msg-&gt;soap);</span></div><div class="line">    <span class="comment">//   soap_end(&amp;msg-&gt;soap);</span></div><div class="line">    <span class="comment">//   soap_done(&amp;msg-&gt;soap);</span></div><div class="line">    <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, NULL);</div><div class="line"></div><div class="line">    <span class="comment">// delete the queue (allocated in current context)</span></div><div class="line">    soap_destroy(soap);</div><div class="line">    soap_end(soap);</div><div class="line">  }</div><div class="line">  ...</div><div class="line">  <span class="comment">// finalize</span></div><div class="line">  ...</div><div class="line">  soap_free(soap);</div><div class="line">}</div></div><!-- fragment --><p>Alternatively, it is also possible to call <code>soap_mq_del(queue, msg)</code> after <code>soap_serve(&amp;msg-&gt;soap)</code> to immediately delete the message after processing. Calling <code>soap_mq_next(msg)</code> for the next loop iteration is still valid, of course.</p>
<h1><a class="anchor" id="mq_2"></a>
WS-RM Server-Side Message Queueing for NoDiscard Behavior with Callback Services</h1>
<p>When messages are controlled by the WS-ReliableMessaging protocol, we can keep the WS-RM messages in a queue that were received out of order until the order is restored and queued messages can be dispatched. This WS-RM behavior is desirable with WS-RM NoDiscard. To implement this approach, we use an inbound message queue for each socket accepted and processed by a thread.</p>
<div class="fragment"><div class="line"><span class="preprocessor">#include &quot;<a class="code" href="wsaapi_8h.html">wsaapi.h</a>&quot;</span></div><div class="line"><span class="preprocessor">#include &quot;<a class="code" href="wsrmapi_8h.html">wsrmapi.h</a>&quot;</span></div><div class="line"><span class="preprocessor">#include &quot;<a class="code" href="mq_8h.html">mq.h</a>&quot;</span></div><div class="line"><span class="preprocessor">#include &quot;threads.h&quot;</span></div><div class="line"></div><div class="line"><span class="keywordtype">int</span> main()</div><div class="line">{</div><div class="line">  <span class="keyword">struct </span>soap *soap = soap_new1(SOAP_IO_KEEPALIVE);</div><div class="line">  soap_register_plugin(soap, <a class="code" href="wsaapi_8h.html#aa013e3760b97c2efcc71d29b57394501">soap_wsa</a>);</div><div class="line">  soap_register_plugin(soap, <a class="code" href="wsrmapi_8h.html#a3ca1614f5da3589a41957cb2f93394dc">soap_wsrm</a>);</div><div class="line">  soap_register_plugin(soap, <a class="code" href="mq_8h.html#a96bf20fae13f6f4318f1592b9d0adfba">soap_mq</a>);</div><div class="line">  ...</div><div class="line">  <span class="comment">// initializations, port bind etc.</span></div><div class="line">  ...</div><div class="line">  <span class="keywordflow">while</span> (soap_valid_socket(soap_accept(soap)))</div><div class="line">  {</div><div class="line">    THREAD_TYPE tid;</div><div class="line">    <span class="keyword">struct </span>soap *tsoap = soap_copy(soap);</div><div class="line">    <span class="keywordflow">if</span> (!tsoap)</div><div class="line">      soap_closesock(soap);</div><div class="line">    <span class="keywordflow">else</span></div><div class="line">      <span class="keywordflow">while</span> (THREAD_CREATE(&amp;tid, (<span class="keywordtype">void</span>*(*)(<span class="keywordtype">void</span>*))process_request, (<span class="keywordtype">void</span>*)tsoap))</div><div class="line">        sleep(1);</div><div class="line">  }</div><div class="line">  ...</div><div class="line">  <span class="comment">// finalize</span></div><div class="line">  ...</div><div class="line">  soap_free(soap);</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> *process_request(<span class="keywordtype">void</span> *tsoap)</div><div class="line">{</div><div class="line">  <span class="keyword">struct </span>soap *soap = (<span class="keyword">struct </span>soap*)tsoap;</div><div class="line">  <span class="keyword">struct </span>ms_queue *queue = <a class="code" href="mq_8h.html#abffc350d4d0892c678b294badd572f11">soap_mq_queue</a>(soap);</div><div class="line">  <span class="keyword">struct </span>ms_msg *msg;</div><div class="line">  <span class="keyword">struct </span>soap ctx;</div><div class="line">  <span class="keywordflow">while</span> ((msg = <a class="code" href="mq_8h.html#a762d9aa6281902622b846135918b897f">soap_mq_get</a>(soap, queue)) != NULL)</div><div class="line">  {</div><div class="line">    <span class="comment">// parse the message headers, if NoDiscard then keep message in queue to retry later</span></div><div class="line">    <span class="comment">// copy the context, since we want to preserve the original to retry later</span></div><div class="line">    soap_copy_context(&amp;ctx, &amp;msg-&gt;soap);</div><div class="line"></div><div class="line">    <span class="keywordflow">if</span> (soap_begin_serve(&amp;ctx))</div><div class="line">    {</div><div class="line">      soap_send_fault(&amp;ctx); <span class="comment">// send fault, close socket</span></div><div class="line">      <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line">    }</div><div class="line">    <span class="keywordflow">else</span> <span class="keywordflow">if</span> (!ctx.header || !ctx.header-&gt;wsrm__Sequence)</div><div class="line">    {</div><div class="line">      <span class="comment">// this is not a WS-RM message, so serve immediately</span></div><div class="line">      soap_serve(&amp;msg-&gt;soap); <span class="comment">// service operations</span></div><div class="line">      <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line">    }</div><div class="line">    <span class="keywordflow">else</span> <span class="keywordflow">if</span> (!<a class="code" href="wsrmapi_8h.html#a0304e1219b99929fa576cf12114206eb">soap_wsrm_check</a>(&amp;ctx))</div><div class="line">    {</div><div class="line">      <span class="comment">// check is OK, process this WS-RM message now</span></div><div class="line">      soap_serve(&amp;msg-&gt;soap); <span class="comment">// service operations SHOULD NOT call soap_wsrm_check()</span></div><div class="line">      <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line">    }</div><div class="line">    <span class="keywordflow">else</span> <span class="keywordflow">if</span> (ctx.error != SOAP_STOP)</div><div class="line">    {</div><div class="line">      <span class="comment">// check failed, not a WS-RM message or other WS-RM error</span></div><div class="line">      soap_send_fault(&amp;ctx); <span class="comment">// send fault, close socket</span></div><div class="line">      <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg); <span class="comment">// delete message from queue</span></div><div class="line">    }</div><div class="line">    soap_destroy(&amp;ctx);</div><div class="line">    soap_end(&amp;ctx);</div><div class="line">    soap_done(&amp;ctx);</div><div class="line">  }</div><div class="line">  <span class="comment">// as long as the queue is not empty and WS-RM sequence(s) not terminated, keep trying</span></div><div class="line">  <span class="keywordflow">while</span> ((msg = <a class="code" href="mq_8h.html#a5efe3ffa00498c5b01ccdb5d447cffa2">soap_mq_begin</a>(queue)) != NULL)</div><div class="line">  {</div><div class="line">    <span class="comment">// process queued WS-RM messages</span></div><div class="line">    <span class="keywordflow">for</span> (; msg != NULL; msg = <a class="code" href="mq_8h.html#aa7609d8ff2851cb2b0237710216039aa">soap_mq_next</a>(msg))</div><div class="line">    {</div><div class="line">      <span class="comment">// try next message in queue</span></div><div class="line">      soap_copy_context(&amp;ctx, &amp;msg-&gt;soap);</div><div class="line">      <span class="keywordflow">if</span> (!soap_begin_serve(&amp;ctx) &amp;&amp; !<a class="code" href="wsrmapi_8h.html#a0304e1219b99929fa576cf12114206eb">soap_wsrm_check</a>(&amp;ctx))</div><div class="line">      {</div><div class="line">        <span class="comment">// check is OK, process message</span></div><div class="line">        soap_serve(&amp;msg-&gt;soap);</div><div class="line">        <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg);</div><div class="line">      }</div><div class="line">      <span class="keywordflow">else</span> <span class="keywordflow">if</span> (ctx.error != SOAP_STOP)</div><div class="line">        <a class="code" href="mq_8h.html#a85cbc8e97baa65dc3b3c6bc124213c27">soap_mq_del</a>(queue, msg);</div><div class="line">      soap_destroy(&amp;ctx);</div><div class="line">      soap_end(&amp;ctx);</div><div class="line">      soap_done(&amp;ctx);</div><div class="line">    }</div><div class="line">    sleep(1); <span class="comment">// sleep some before around we go again</span></div><div class="line">  }</div><div class="line">  <span class="keywordflow">return</span> NULL;</div><div class="line">}</div></div><!-- fragment --><p>In the first loop that runs over the messages received on the same keep-alive socket, the messages will be processed and services dispatched immediately for non-WS-RM messages and when the WS-RM check succeeds. This check is done in the server dispatch loop as shown, which means that WS-RM-based service operations SHOULD NOT call <a class="el" href="wsrmapi_8h.html#a0304e1219b99929fa576cf12114206eb" title="Receiver (server)-side check for the presence of WS-Addressing and WS-RM header blocks in the SOAP he...">soap_wsrm_check()</a> again. WS-RM messages that cannot be processed yet since they are out of the sequence order will remain in the queue.</p>
<p>The second loop over the queued messages will retry to dispatch service operations according to the WS-RM message order as required by WS-RM NoDiscard sequence behavior. The loop will run until the queue is empty or when the WS-RM sequences are closed/terminated. </p>
</div></div><!-- contents -->
<hr class="footer">
<address class="footer">
Copyright (C) 2018, Robert van Engelen, Genivia Inc., All Rights Reserved.
</address>
<address class="footer"><small>
Converted on Sun Dec 9 2018 16:27:25 by <a target="_blank" href="http://www.doxygen.org/index.html">Doxygen</a> 1.8.11</small></address>
<br>
<div style="height: 246px; background: #DBDBDB;">
</body>
</html>
